AutoShardClient
class
AutoShardedClient (Client)
¶
A client to automatically shard the bot.
You can optionally specify the total number of shards to start with, or it will be determined automatically.
Source code in naff/client/auto_shard_client.py
class AutoShardedClient(Client):
"""
A client to automatically shard the bot.
You can optionally specify the total number of shards to start with, or it will be determined automatically.
"""
def __init__(self, *args, **kwargs) -> None:
if "total_shards" not in kwargs:
self.auto_sharding = True
else:
self.auto_sharding = False
super().__init__(*args, **kwargs)
self._connection_state = None
self._connection_states: list[ConnectionState] = []
self.max_start_concurrency: int = 1
@property
def gateway_started(self) -> bool:
"""Returns if the gateway has been started in all shards."""
return all(state.gateway_started.is_set() for state in self._connection_states)
@property
def shards(self) -> list[ConnectionState]:
"""Returns a list of all shards currently in use."""
return self._connection_states
@property
def latency(self) -> float:
"""The average latency of all active gateways."""
if len(self._connection_states):
latencies = sum((g.latency for g in self._connection_states))
return latencies / len(self._connection_states)
else:
return float("inf")
@property
def latencies(self) -> dict[int, float]:
"""
Return a dictionary of latencies for all shards.
Returns:
{shard_id: latency}
"""
return {state.shard_id: state.latency for state in self._connection_states}
async def stop(self) -> None:
"""Shutdown the bot."""
logger.debug("Stopping the bot.")
self._ready.clear()
await self.http.close()
await asyncio.gather(*(state.stop() for state in self._connection_states))
def get_guild_websocket(self, guild_id: "Snowflake_Type") -> GatewayClient:
"""
Get the appropriate websocket for a given guild
Args:
guild_id: The ID of the guild
Returns:
A gateway client for the given ID
"""
shard_id = (guild_id >> 22) % self.total_shards
return next((state for state in self._connection_states if state.shard_id == shard_id), MISSING).gateway
def get_shards_guild(self, shard_id: int) -> list[Guild]:
"""
Returns the guilds that the specified shard can see
Args:
shard_id: The ID of the shard
Returns:
A list of guilds
"""
return [guild for key, guild in self.cache.guild_cache.items() if ((key >> 22) % self.total_shards) == shard_id]
@Listener.create()
async def _on_websocket_ready(self, event: events.RawGatewayEvent) -> None:
"""
Catches websocket ready and determines when to dispatch the client `READY` signal.
Args:
event: The websocket ready packet
"""
connection_data = event.data
expected_guilds = {to_snowflake(guild["id"]) for guild in connection_data["guilds"]}
shard_id, total_shards = connection_data["shard"]
connection_state = next((state for state in self._connection_states if state.shard_id == shard_id), None)
if len(expected_guilds) != 0:
while True:
try:
await asyncio.wait_for(self._guild_event.wait(), self.guild_event_timeout)
except asyncio.TimeoutError:
logger.warning("Timeout waiting for guilds cache: Not all guilds will be in cache")
break
self._guild_event.clear()
if all(self.cache.get_guild(g_id) is not None for g_id in expected_guilds):
# all guilds cached
break
if self.fetch_members:
logger.info(f"Shard {shard_id} is waiting for members to be chunked")
await asyncio.gather(*(guild.chunked.wait() for guild in self.guilds if guild.id in expected_guilds))
else:
logger.warning(
f"Shard {shard_id} reports it has 0 guilds, this is an indicator you may be using too many shards"
)
# noinspection PyProtectedMember
connection_state._shard_ready.set()
self.dispatch(ShardConnect(shard_id))
logger.debug(f"Shard {shard_id} is now ready")
# noinspection PyProtectedMember
await asyncio.gather(*[shard._shard_ready.wait() for shard in self._connection_states])
# run any pending startup tasks
if self.async_startup_tasks:
try:
await asyncio.gather(*self.async_startup_tasks)
except Exception as e:
self.dispatch(events.Error("async-extension-loader", e))
# cache slash commands
if not self._startup:
await self._init_interactions()
if not self._ready.is_set():
self._ready.set()
if not self._startup:
self._startup = True
self.dispatch(events.Startup())
self.dispatch(events.Ready())
async def astart(self, token) -> None:
"""
Asynchronous method to start the bot.
Args:
token: Your bot's token
"""
logger.debug("Starting http client...")
await self.login(token)
tasks = []
# Sort shards into their respective ratelimit buckets
shard_buckets = defaultdict(list)
for shard in self._connection_states:
bucket = str(shard.shard_id % self.max_start_concurrency)
shard_buckets[bucket].append(shard)
for bucket in shard_buckets.values():
for shard in bucket:
logger.debug(f"Starting {shard.shard_id}")
start = time.perf_counter()
tasks.append(asyncio.create_task(shard.start()))
if self.max_start_concurrency == 1:
# connection ratelimiting when discord has asked for one connection concurrently
# noinspection PyProtectedMember
await shard._shard_ready.wait()
await asyncio.sleep(5.1 - (time.perf_counter() - start))
# wait for shards to finish starting
# noinspection PyProtectedMember
await asyncio.gather(*[shard._shard_ready.wait() for shard in self._connection_states])
try:
await asyncio.gather(*tasks)
finally:
await self.stop()
async def login(self, token) -> None:
"""
Login to discord via http.
!!! note
You will need to run Naff.start_gateway() before you start receiving gateway events.
Args:
token str: Your bot's token
"""
await super().login(token)
data = await self.http.get_gateway_bot()
self.max_start_concurrency = data["session_start_limit"]["max_concurrency"]
if self.auto_sharding:
self.total_shards = data["shards"]
elif data["shards"] != self.total_shards:
recommended_shards = data["shards"]
logger.info(
f"Discord recommends you start with {recommended_shards} shard{'s' if recommended_shards != 1 else ''} instead of {self.total_shards}"
)
logger.debug(f"Starting bot with {self.total_shards} shard{'s' if self.total_shards != 1 else ''}")
self._connection_states: list[ConnectionState] = [
ConnectionState(self, self.intents, shard_id) for shard_id in range(self.total_shards)
]
property
readonly
gateway_started: bool
¶
Returns if the gateway has been started in all shards.
property
readonly
shards: list
¶
Returns a list of all shards currently in use.
property
readonly
latency: float
¶
The average latency of all active gateways.
property
readonly
latencies: dict
¶
Return a dictionary of latencies for all shards.
Returns:
Type | Description |
---|---|
{shard_id |
latency} |
async
method
stop(self)
¶
Shutdown the bot.
Source code in naff/client/auto_shard_client.py
async def stop(self) -> None:
"""Shutdown the bot."""
logger.debug("Stopping the bot.")
self._ready.clear()
await self.http.close()
await asyncio.gather(*(state.stop() for state in self._connection_states))
method
get_guild_websocket(self, guild_id)
¶
Get the appropriate websocket for a given guild
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guild_id |
Snowflake_Type |
The ID of the guild |
required |
Returns:
Type | Description |
---|---|
GatewayClient |
A gateway client for the given ID |
Source code in naff/client/auto_shard_client.py
def get_guild_websocket(self, guild_id: "Snowflake_Type") -> GatewayClient:
"""
Get the appropriate websocket for a given guild
Args:
guild_id: The ID of the guild
Returns:
A gateway client for the given ID
"""
shard_id = (guild_id >> 22) % self.total_shards
return next((state for state in self._connection_states if state.shard_id == shard_id), MISSING).gateway
method
get_shards_guild(self, shard_id)
¶
Returns the guilds that the specified shard can see
Parameters:
Name | Type | Description | Default |
---|---|---|---|
shard_id |
int |
The ID of the shard |
required |
Returns:
Type | Description |
---|---|
list |
A list of guilds |
Source code in naff/client/auto_shard_client.py
def get_shards_guild(self, shard_id: int) -> list[Guild]:
"""
Returns the guilds that the specified shard can see
Args:
shard_id: The ID of the shard
Returns:
A list of guilds
"""
return [guild for key, guild in self.cache.guild_cache.items() if ((key >> 22) % self.total_shards) == shard_id]
async
method
astart(self, token)
¶
Asynchronous method to start the bot.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
Your bot's token |
required |
Source code in naff/client/auto_shard_client.py
async def astart(self, token) -> None:
"""
Asynchronous method to start the bot.
Args:
token: Your bot's token
"""
logger.debug("Starting http client...")
await self.login(token)
tasks = []
# Sort shards into their respective ratelimit buckets
shard_buckets = defaultdict(list)
for shard in self._connection_states:
bucket = str(shard.shard_id % self.max_start_concurrency)
shard_buckets[bucket].append(shard)
for bucket in shard_buckets.values():
for shard in bucket:
logger.debug(f"Starting {shard.shard_id}")
start = time.perf_counter()
tasks.append(asyncio.create_task(shard.start()))
if self.max_start_concurrency == 1:
# connection ratelimiting when discord has asked for one connection concurrently
# noinspection PyProtectedMember
await shard._shard_ready.wait()
await asyncio.sleep(5.1 - (time.perf_counter() - start))
# wait for shards to finish starting
# noinspection PyProtectedMember
await asyncio.gather(*[shard._shard_ready.wait() for shard in self._connection_states])
try:
await asyncio.gather(*tasks)
finally:
await self.stop()
async
method
login(self, token)
¶
Login to discord via http.
Note
You will need to run Naff.start_gateway() before you start receiving gateway events.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
str |
Your bot's token |
required |
Source code in naff/client/auto_shard_client.py
async def login(self, token) -> None:
"""
Login to discord via http.
!!! note
You will need to run Naff.start_gateway() before you start receiving gateway events.
Args:
token str: Your bot's token
"""
await super().login(token)
data = await self.http.get_gateway_bot()
self.max_start_concurrency = data["session_start_limit"]["max_concurrency"]
if self.auto_sharding:
self.total_shards = data["shards"]
elif data["shards"] != self.total_shards:
recommended_shards = data["shards"]
logger.info(
f"Discord recommends you start with {recommended_shards} shard{'s' if recommended_shards != 1 else ''} instead of {self.total_shards}"
)
logger.debug(f"Starting bot with {self.total_shards} shard{'s' if self.total_shards != 1 else ''}")
self._connection_states: list[ConnectionState] = [
ConnectionState(self, self.intents, shard_id) for shard_id in range(self.total_shards)
]
inherited
property
readonly
is_closed: bool
¶
Returns True if the bot has closed.
inherited
property
readonly
is_ready: bool
¶
Returns True if the bot is ready.
inherited
property
readonly
average_latency: float
¶
Returns the average latency of the websocket connection.
inherited
property
readonly
start_time: datetime
¶
The start time of the bot.
inherited
property
readonly
user: NaffUser
¶
Returns the bot's user.
inherited
property
readonly
app: Application
¶
Returns the bots application.
inherited
property
readonly
owner: Optional[User]
¶
Returns the bot's owner'.
inherited
property
readonly
owners: List[User]
¶
Returns the bot's owners as declared via client.owner_ids
.
inherited
property
readonly
guilds: List[Guild]
¶
Returns a list of all guilds the bot is in.
inherited
property
readonly
status: Status
¶
Get the status of the bot.
IE online, afk, dnd
inherited
property
readonly
activity: Activity
¶
Get the activity of the bot.
inherited
property
readonly
application_commands: List[naff.models.naff.application_commands.InteractionCommand]
¶
A list of all application commands registered within the bot.
inherited
property
readonly
ws: GatewayClient
¶
Returns the websocket client.
async
inherited
method
generate_prefixes(self, bot, message)
¶
A method to get the bot's default_prefix, can be overridden to add dynamic prefixes.
Note
To easily override this method, simply use the generate_prefixes
parameter when instantiating the client
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bot |
Client |
A reference to the client |
required |
message |
Message |
A message to determine the prefix from. |
required |
Returns:
Type | Description |
---|---|
str | collections.abc.Iterable[str] |
A string or an iterable of strings to use as a prefix. By default, this will return |
Source code in naff/client/auto_shard_client.py
async def generate_prefixes(self, bot: "Client", message: Message) -> str | Iterable[str]:
"""
A method to get the bot's default_prefix, can be overridden to add dynamic prefixes.
!!! note
To easily override this method, simply use the `generate_prefixes` parameter when instantiating the client
Args:
bot: A reference to the client
message: A message to determine the prefix from.
Returns:
A string or an iterable of strings to use as a prefix. By default, this will return `client.default_prefix`
"""
return self.default_prefix
inherited
method
default_error_handler(source, error)
¶
The default error logging behaviour.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
The source of this error |
required |
error |
BaseException |
The exception itself |
required |
Source code in naff/client/auto_shard_client.py
@staticmethod
def default_error_handler(source: str, error: BaseException) -> None:
"""
The default error logging behaviour.
Args:
source: The source of this error
error: The exception itself
"""
out = traceback.format_exception(error)
if isinstance(error, HTTPException):
# HTTPException's are of 3 known formats, we can parse them for human readable errors
try:
errors = error.search_for_message(error.errors)
out = f"HTTPException: {error.status}|{error.response.reason}: " + "\n".join(errors)
except Exception: # noqa : S110
pass
logger.error(
"Ignoring exception in {}:{}{}".format(source, "\n" if len(out) > 1 else " ", "".join(out)),
)
async
inherited
method
on_error(self, source, error, *args, **kwargs)
¶
Catches all errors dispatched by the library.
By default it will format and print them to console
Override this to change error handling behaviour
Source code in naff/client/auto_shard_client.py
async def on_error(self, source: str, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by the library.
By default it will format and print them to console
Override this to change error handling behaviour
"""
self.default_error_handler(source, error)
async
inherited
method
on_command_error(self, ctx, error, *args, **kwargs)
¶
Catches all errors dispatched by commands.
By default it will call Client.on_error
Override this to change error handling behavior
Source code in naff/client/auto_shard_client.py
async def on_command_error(self, ctx: Context, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by commands.
By default it will call `Client.on_error`
Override this to change error handling behavior
"""
self.dispatch(events.Error(f"cmd /`{ctx.invoke_target}`", error, args, kwargs, ctx))
try:
if isinstance(error, errors.CommandOnCooldown):
await ctx.send(
embeds=Embed(
description=f"This command is on cooldown!\n"
f"Please try again in {int(error.cooldown.get_cooldown_time())} seconds",
color=BrandColors.FUCHSIA,
)
)
elif isinstance(error, errors.MaxConcurrencyReached):
await ctx.send(
embeds=Embed(
description="This command has reached its maximum concurrent usage!\n"
"Please try again shortly.",
color=BrandColors.FUCHSIA,
)
)
elif isinstance(error, errors.CommandCheckFailure):
await ctx.send(
embeds=Embed(
description="You do not have permission to run this command!",
color=BrandColors.YELLOW,
)
)
elif self.send_command_tracebacks:
out = "".join(traceback.format_exception(error))
if self.http.token is not None:
out = out.replace(self.http.token, "[REDACTED TOKEN]")
await ctx.send(
embeds=Embed(
title=f"Error: {type(error).__name__}",
color=BrandColors.RED,
description=f"```\n{out[:EMBED_MAX_DESC_LENGTH-8]}```",
)
)
except errors.NaffException:
pass
async
inherited
method
on_command(self, ctx)
¶
Called after any command is ran.
By default, it will simply log the command, override this to change that behaviour
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ctx |
Context |
The context of the command that was called |
required |
Source code in naff/client/auto_shard_client.py
async def on_command(self, ctx: Context) -> None:
"""
Called *after* any command is ran.
By default, it will simply log the command, override this to change that behaviour
Args:
ctx: The context of the command that was called
"""
if isinstance(ctx, PrefixedContext):
symbol = "@"
elif isinstance(ctx, InteractionContext):
symbol = "/"
else:
symbol = "?" # likely custom context
logger.info(f"Command Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")
async
inherited
method
on_component_error(self, ctx, error, *args, **kwargs)
¶
Catches all errors dispatched by components.
By default it will call Naff.on_error
Override this to change error handling behavior
Source code in naff/client/auto_shard_client.py
async def on_component_error(self, ctx: ComponentContext, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by components.
By default it will call `Naff.on_error`
Override this to change error handling behavior
"""
return self.dispatch(events.Error(f"Component Callback for {ctx.custom_id}", error, args, kwargs, ctx))
async
inherited
method
on_component(self, ctx)
¶
Called after any component callback is ran.
By default, it will simply log the component use, override this to change that behaviour
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ctx |
ComponentContext |
The context of the component that was called |
required |
Source code in naff/client/auto_shard_client.py
async def on_component(self, ctx: ComponentContext) -> None:
"""
Called *after* any component callback is ran.
By default, it will simply log the component use, override this to change that behaviour
Args:
ctx: The context of the component that was called
"""
symbol = "¢"
logger.info(f"Component Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")
async
inherited
method
on_autocomplete_error(self, ctx, error, *args, **kwargs)
¶
Catches all errors dispatched by autocompletion options.
By default it will call Naff.on_error
Override this to change error handling behavior
Source code in naff/client/auto_shard_client.py
async def on_autocomplete_error(self, ctx: AutocompleteContext, error: Exception, *args, **kwargs) -> None:
"""
Catches all errors dispatched by autocompletion options.
By default it will call `Naff.on_error`
Override this to change error handling behavior
"""
return self.dispatch(
events.Error(
f"Autocomplete Callback for /{ctx.invoke_target} - Option: {ctx.focussed_option}",
error,
args,
kwargs,
ctx,
)
)
async
inherited
method
on_autocomplete(self, ctx)
¶
Called after any autocomplete callback is ran.
By default, it will simply log the autocomplete callback, override this to change that behaviour
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ctx |
AutocompleteContext |
The context of the command that was called |
required |
Source code in naff/client/auto_shard_client.py
async def on_autocomplete(self, ctx: AutocompleteContext) -> None:
"""
Called *after* any autocomplete callback is ran.
By default, it will simply log the autocomplete callback, override this to change that behaviour
Args:
ctx: The context of the command that was called
"""
symbol = "$"
logger.info(f"Autocomplete Called: {symbol}{ctx.invoke_target} with {ctx.args = } | {ctx.kwargs = }")
inherited
method
start(self, token)
¶
Start the bot.
Info
This is the recommended method to start the bot
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
Your bot's token |
required |
Source code in naff/client/auto_shard_client.py
def start(self, token) -> None:
"""
Start the bot.
info:
This is the recommended method to start the bot
Args:
token: Your bot's token
"""
try:
asyncio.run(self.astart(token))
except KeyboardInterrupt:
# ignore, cus this is useless and can be misleading to the
# user
pass
async
inherited
method
start_gateway(self)
¶
Starts the gateway connection.
Source code in naff/client/auto_shard_client.py
async def start_gateway(self) -> None:
"""Starts the gateway connection."""
try:
await self._connection_state.start()
finally:
await self.stop()
inherited
method
dispatch(self, event, *args, **kwargs)
¶
Dispatch an event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
BaseEvent |
The event to be dispatched. |
required |
Source code in naff/client/auto_shard_client.py
def dispatch(self, event: events.BaseEvent, *args, **kwargs) -> None:
"""
Dispatch an event.
Args:
event: The event to be dispatched.
"""
listeners = self.listeners.get(event.resolved_name, [])
if listeners:
logger.debug(f"Dispatching Event: {event.resolved_name}")
event.bot = self
for _listen in listeners:
try:
self._queue_task(_listen, event, *args, **kwargs)
except Exception as e:
raise BotException(
f"An error occurred attempting during {event.resolved_name} event processing"
) from e
_waits = self.waits.get(event.resolved_name, [])
if _waits:
index_to_remove = []
for i, _wait in enumerate(_waits):
result = _wait(event)
if result:
index_to_remove.append(i)
for idx in sorted(index_to_remove, reverse=True):
_waits.pop(idx)
async
inherited
method
wait_until_ready(self)
¶
Waits for the client to become ready.
Source code in naff/client/auto_shard_client.py
async def wait_until_ready(self) -> None:
"""Waits for the client to become ready."""
await self._ready.wait()
inherited
method
wait_for(self, event, checks, timeout)
¶
Waits for a WebSocket event to be dispatched.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Union[str, BaseEvent] |
The name of event to wait. |
required |
checks |
Union[Callable[..., bool], NoneType, naff.client.const.Missing] |
A predicate to check what to wait for. |
Missing |
timeout |
Optional[float] |
The number of seconds to wait before timing out. |
None |
Returns:
Type | Description |
---|---|
Any |
The event object. |
Source code in naff/client/auto_shard_client.py
def wait_for(
self,
event: Union[str, "BaseEvent"],
checks: Absent[Optional[Callable[..., bool]]] = MISSING,
timeout: Optional[float] = None,
) -> Any:
"""
Waits for a WebSocket event to be dispatched.
Args:
event: The name of event to wait.
checks: A predicate to check what to wait for.
timeout: The number of seconds to wait before timing out.
Returns:
The event object.
"""
event = get_event_name(event)
if event not in self.waits:
self.waits[event] = []
future = asyncio.Future()
self.waits[event].append(Wait(event, checks, future))
return asyncio.wait_for(future, timeout)
async
inherited
method
wait_for_modal(self, modal, author, timeout)
¶
Wait for a modal response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
modal |
Modal |
The modal we're waiting for. |
required |
author |
Optional[Snowflake_Type] |
The user we're waiting for to reply |
None |
timeout |
Optional[float] |
A timeout in seconds to stop waiting |
None |
Returns:
Type | Description |
---|---|
ModalContext |
The context of the modal response |
Source code in naff/client/auto_shard_client.py
async def wait_for_modal(
self,
modal: "Modal",
author: Optional["Snowflake_Type"] = None,
timeout: Optional[float] = None,
) -> ModalContext:
"""
Wait for a modal response.
Args:
modal: The modal we're waiting for.
author: The user we're waiting for to reply
timeout: A timeout in seconds to stop waiting
Returns:
The context of the modal response
Raises:
`asyncio.TimeoutError` if no response is received that satisfies the predicate before timeout seconds have passed
"""
author = to_snowflake(author) if author else None
def predicate(event) -> bool:
if modal.custom_id != event.context.custom_id:
return False
if author and author != to_snowflake(event.context.author):
return False
return True
resp = await self.wait_for("modal_response", predicate, timeout)
return resp.context
async
inherited
method
wait_for_component(self, messages, components, check, timeout)
¶
Waits for a component to be sent to the bot.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages |
Union[naff.models.discord.message.Message, int, list] |
The message object to check for. |
None |
components |
Union[List[List[Union[BaseComponent, dict]]], List[Union[BaseComponent, dict]], BaseComponent, dict] |
The components to wait for. |
None |
check |
Optional[Callable] |
A predicate to check what to wait for. |
None |
timeout |
Optional[float] |
The number of seconds to wait before timing out. |
None |
Returns:
Type | Description |
---|---|
Component |
|
Source code in naff/client/auto_shard_client.py
async def wait_for_component(
self,
messages: Union[Message, int, list] = None,
components: Optional[
Union[List[List[Union["BaseComponent", dict]]], List[Union["BaseComponent", dict]], "BaseComponent", dict]
] = None,
check: Optional[Callable] = None,
timeout: Optional[float] = None,
) -> "Component":
"""
Waits for a component to be sent to the bot.
Args:
messages: The message object to check for.
components: The components to wait for.
check: A predicate to check what to wait for.
timeout: The number of seconds to wait before timing out.
Returns:
`Component` that was invoked. Use `.context` to get the `ComponentContext`.
Raises:
`asyncio.TimeoutError` if timed out
"""
if not (messages or components):
raise ValueError("You must specify messages or components (or both)")
message_ids = (
to_snowflake_list(messages) if isinstance(messages, list) else to_snowflake(messages) if messages else None
)
custom_ids = list(get_components_ids(components)) if components else None
# automatically convert improper custom_ids
if custom_ids and not all(isinstance(x, str) for x in custom_ids):
custom_ids = [str(i) for i in custom_ids]
def _check(event: Component) -> bool:
ctx: ComponentContext = event.context
# if custom_ids is empty or there is a match
wanted_message = not message_ids or ctx.message.id in (
[message_ids] if isinstance(message_ids, int) else message_ids
)
wanted_component = not custom_ids or ctx.custom_id in custom_ids
if wanted_message and wanted_component:
if check is None or check(event):
return True
return False
return False
return await self.wait_for("component", checks=_check, timeout=timeout)
inherited
method
listen(self, event_name)
¶
A decorator to be used in situations that Naff can't automatically hook your listeners. Ideally, the standard listen decorator should be used, not this.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_name |
Union[str, naff.client.const.Missing] |
The event name to use, if not the coroutine name |
Missing |
Returns:
Type | Description |
---|---|
Listener |
A listener that can be used to hook into the event. |
Source code in naff/client/auto_shard_client.py
def listen(self, event_name: Absent[str] = MISSING) -> Listener:
"""
A decorator to be used in situations that Naff can't automatically hook your listeners. Ideally, the standard listen decorator should be used, not this.
Args:
event_name: The event name to use, if not the coroutine name
Returns:
A listener that can be used to hook into the event.
"""
def wrapper(coro: Callable[..., Coroutine]) -> Listener:
listener = Listener.create(event_name)(coro)
self.add_listener(listener)
return listener
return wrapper
inherited
method
add_event_processor(self, event_name)
¶
A decorator to be used to add event processors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_name |
Union[str, naff.client.const.Missing] |
The event name to use, if not the coroutine name |
Missing |
Returns:
Type | Description |
---|---|
Callable[..., Coroutine] |
A function that can be used to hook into the event. |
Source code in naff/client/auto_shard_client.py
def add_event_processor(self, event_name: Absent[str] = MISSING) -> Callable[..., Coroutine]:
"""
A decorator to be used to add event processors.
Args:
event_name: The event name to use, if not the coroutine name
Returns:
A function that can be used to hook into the event.
"""
def wrapper(coro: Callable[..., Coroutine]) -> Callable[..., Coroutine]:
name = event_name
if name is MISSING:
name = coro.__name__
name = name.lstrip("_")
name = name.removeprefix("on_")
self.processors[name] = coro
return coro
return wrapper
inherited
method
add_listener(self, listener)
¶
Add a listener for an event, if no event is passed, one is determined.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
listener |
Listener |
The listener to add to the client |
required |
Source code in naff/client/auto_shard_client.py
def add_listener(self, listener: Listener) -> None:
"""
Add a listener for an event, if no event is passed, one is determined.
Args:
listener Listener: The listener to add to the client
"""
# check that the required intents are enabled
event_class_name = "".join([name.capitalize() for name in listener.event.split("_")])
if event_class := globals().get(event_class_name):
if required_intents := _INTENT_EVENTS.get(event_class): # noqa
if not any(required_intent in self.intents for required_intent in required_intents):
self.logger.warning(
f"Event `{listener.event}` will not work since the required intent is not set -> Requires any of: `{required_intents}`"
)
if listener.event not in self.listeners:
self.listeners[listener.event] = []
self.listeners[listener.event].append(listener)
inherited
method
add_interaction(self, command)
¶
Add a slash command to the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
InteractionCommand |
The command to add |
required |
Source code in naff/client/auto_shard_client.py
def add_interaction(self, command: InteractionCommand) -> bool:
"""
Add a slash command to the client.
Args:
command InteractionCommand: The command to add
"""
if self.debug_scope:
command.scopes = [self.debug_scope]
# for SlashCommand objs without callback (like objects made to hold group info etc)
if command.callback is None:
return False
for scope in command.scopes:
if scope not in self.interactions:
self.interactions[scope] = {}
elif command.resolved_name in self.interactions[scope]:
old_cmd = self.interactions[scope][command.resolved_name]
raise ValueError(f"Duplicate Command! {scope}::{old_cmd.resolved_name}")
if self.enforce_interaction_perms:
command.checks.append(command._permission_enforcer) # noqa : w0212
self.interactions[scope][command.resolved_name] = command
return True
inherited
method
add_prefixed_command(self, command)
¶
Add a prefixed command to the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
PrefixedCommand |
The command to add |
required |
Source code in naff/client/auto_shard_client.py
def add_prefixed_command(self, command: PrefixedCommand) -> None:
"""
Add a prefixed command to the client.
Args:
command PrefixedCommand: The command to add
"""
# check that the required intent is enabled or the prefix is a mention
prefixes = (
self.default_prefix
if not isinstance(self.default_prefix, str) and not self.default_prefix == MENTION_PREFIX
else (self.default_prefix,)
)
if (MENTION_PREFIX not in prefixes) and (Intents.GUILD_MESSAGE_CONTENT not in self.intents):
self.logger.warning(
f"Prefixed commands will not work since the required intent is not set -> Requires: `{Intents.GUILD_MESSAGE_CONTENT.__repr__()}` or usage of the default `MENTION_PREFIX` as the prefix"
)
command._parse_parameters()
if self.prefixed_commands.get(command.name):
raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {command.name}.")
self.prefixed_commands[command.name] = command
for alias in command.aliases:
if self.prefixed_commands.get(alias):
raise ValueError(f"Duplicate command! Multiple commands share the name/alias: {alias}.")
self.prefixed_commands[alias] = command
inherited
method
add_component_callback(self, command)
¶
Add a component callback to the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
ComponentCommand |
The command to add |
required |
Source code in naff/client/auto_shard_client.py
def add_component_callback(self, command: ComponentCommand) -> None:
"""
Add a component callback to the client.
Args:
command: The command to add
"""
for listener in command.listeners:
# I know this isn't an ideal solution, but it means we can lookup callbacks with O(1)
if listener not in self._component_callbacks.keys():
self._component_callbacks[listener] = command
continue
else:
raise ValueError(f"Duplicate Component! Multiple component callbacks for `{listener}`")
inherited
method
add_modal_callback(self, command)
¶
Add a modal callback to the client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
ModalCommand |
The command to add |
required |
Source code in naff/client/auto_shard_client.py
def add_modal_callback(self, command: ModalCommand) -> None:
"""
Add a modal callback to the client.
Args:
command: The command to add
"""
for listener in command.listeners:
if listener not in self._modal_callbacks.keys():
self._modal_callbacks[listener] = command
continue
else:
raise ValueError(f"Duplicate Component! Multiple modal callbacks for `{listener}`")
async
inherited
method
synchronise_interactions(self, *, scopes, delete_commands)
¶
Synchronise registered interactions with discord.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
scopes |
list |
Optionally specify which scopes are to be synced |
Missing |
delete_commands |
Union[bool, naff.client.const.Missing] |
Override the client setting and delete commands |
Missing |
Source code in naff/client/auto_shard_client.py
async def synchronise_interactions(
self, *, scopes: list["Snowflake_Type"] = MISSING, delete_commands: Absent[bool] = MISSING
) -> None:
"""
Synchronise registered interactions with discord.
Args:
scopes: Optionally specify which scopes are to be synced
delete_commands: Override the client setting and delete commands
"""
s = time.perf_counter()
_delete_cmds = self.del_unused_app_cmd if delete_commands is MISSING else delete_commands
await self._cache_interactions()
if scopes is not MISSING:
cmd_scopes = scopes
elif self.del_unused_app_cmd:
# if we're deleting unused commands, we check all scopes
cmd_scopes = [to_snowflake(g_id) for g_id in self._user._guild_ids] + [GLOBAL_SCOPE]
else:
# if we're not deleting, just check the scopes we have cmds registered in
cmd_scopes = list(set(self.interactions) | {GLOBAL_SCOPE})
local_cmds_json = application_commands_to_dict(self.interactions)
async def sync_scope(cmd_scope) -> None:
sync_needed_flag = False # a flag to force this scope to synchronise
sync_payload = [] # the payload to be pushed to discord
try:
try:
remote_commands = await self.http.get_application_commands(self.app.id, cmd_scope)
except Forbidden:
logger.warning(f"Bot is lacking `application.commands` scope in {cmd_scope}!")
return
for local_cmd in self.interactions.get(cmd_scope, {}).values():
# get remote equivalent of this command
remote_cmd_json = next(
(v for v in remote_commands if int(v["id"]) == local_cmd.cmd_id.get(cmd_scope)), None
)
# get json representation of this command
local_cmd_json = next((c for c in local_cmds_json[cmd_scope] if c["name"] == str(local_cmd.name)))
# this works by adding any command we *want* on Discord, to a payload, and synchronising that
# this allows us to delete unused commands, add new commands, or do nothing in 1 or less API calls
if sync_needed(local_cmd_json, remote_cmd_json):
# determine if the local and remote commands are out-of-sync
sync_needed_flag = True
sync_payload.append(local_cmd_json)
elif not _delete_cmds and remote_cmd_json:
_remote_payload = {
k: v for k, v in remote_cmd_json.items() if k not in ("id", "application_id", "version")
}
sync_payload.append(_remote_payload)
elif _delete_cmds:
sync_payload.append(local_cmd_json)
sync_payload = [json.loads(_dump) for _dump in {json.dumps(_cmd) for _cmd in sync_payload}]
if sync_needed_flag or (_delete_cmds and len(sync_payload) < len(remote_commands)):
# synchronise commands if flag is set, or commands are to be deleted
logger.info(f"Overwriting {cmd_scope} with {len(sync_payload)} application commands")
sync_response: list[dict] = await self.http.overwrite_application_commands(
self.app.id, sync_payload, cmd_scope
)
self._cache_sync_response(sync_response, cmd_scope)
else:
logger.debug(f"{cmd_scope} is already up-to-date with {len(remote_commands)} commands.")
except Forbidden as e:
raise InteractionMissingAccess(cmd_scope) from e
except HTTPException as e:
self._raise_sync_exception(e, local_cmds_json, cmd_scope)
await asyncio.gather(*[sync_scope(scope) for scope in cmd_scopes])
t = time.perf_counter() - s
logger.debug(f"Sync of {len(cmd_scopes)} scopes took {t} seconds")
inherited
method
get_application_cmd_by_id(self, cmd_id)
¶
Get a application command from the internal cache by its ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cmd_id |
Snowflake_Type |
The ID of the command |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.naff.application_commands.InteractionCommand] |
The command, if one with the given ID exists internally, otherwise None |
Source code in naff/client/auto_shard_client.py
def get_application_cmd_by_id(self, cmd_id: "Snowflake_Type") -> Optional[InteractionCommand]:
"""
Get a application command from the internal cache by its ID.
Args:
cmd_id: The ID of the command
Returns:
The command, if one with the given ID exists internally, otherwise None
"""
scope = self._interaction_scopes.get(str(cmd_id), MISSING)
cmd_id = int(cmd_id) # ensure int ID
if scope != MISSING:
for cmd in self.interactions[scope].values():
if int(cmd.cmd_id.get(scope)) == cmd_id:
return cmd
return None
async
inherited
method
get_context(self, data, interaction)
¶
Return a context object based on data passed.
Note
If you want to use custom context objects, this is the method to override. Your replacement must take the same arguments as this, and return a Context-like object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[discord_typings.interactions.receiving.ApplicationCommandGuildInteractionData, discord_typings.interactions.receiving.ApplicationCommandChannelInteractionData, discord_typings.interactions.receiving.GuildUserCommandInteractionData, discord_typings.interactions.receiving.ChannelUserCommandInteractionData, discord_typings.interactions.receiving.ComponentGuildInteractionData, discord_typings.interactions.receiving.ComponentChannelInteractionData, discord_typings.interactions.receiving.AutocompleteGuildInteractionData, discord_typings.interactions.receiving.AutocompleteChannelInteractionData, discord_typings.interactions.receiving.ModalGuildInteractionData, discord_typings.interactions.receiving.ModalChannelInteractionData, dict, naff.models.discord.message.Message] |
The data of the event |
required |
interaction |
bool |
Is this an interaction or not? |
False |
Returns:
Type | Description |
---|---|
naff.models.naff.context.ComponentContext | naff.models.naff.context.AutocompleteContext | naff.models.naff.context.ModalContext | naff.models.naff.context.InteractionContext | naff.models.naff.context.PrefixedContext |
Context object |
Source code in naff/client/auto_shard_client.py
async def get_context(
self, data: InteractionData | dict | Message, interaction: bool = False
) -> ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext:
"""
Return a context object based on data passed.
note:
If you want to use custom context objects, this is the method to override. Your replacement must take the same arguments as this, and return a Context-like object.
Args:
data: The data of the event
interaction: Is this an interaction or not?
Returns:
Context object
"""
# this line shuts up IDE warnings
cls: ComponentContext | AutocompleteContext | ModalContext | InteractionContext | PrefixedContext
if interaction:
match data["type"]:
case InteractionTypes.MESSAGE_COMPONENT:
cls = self.component_context.from_dict(data, self)
case InteractionTypes.AUTOCOMPLETE:
cls = self.autocomplete_context.from_dict(data, self)
case InteractionTypes.MODAL_RESPONSE:
cls = self.modal_context.from_dict(data, self)
case _:
cls = self.interaction_context.from_dict(data, self)
if not cls.channel:
try:
cls.channel = await self.cache.fetch_channel(data["channel_id"])
except Forbidden:
cls.channel = BaseChannel.from_dict_factory(
{"id": data["channel_id"], "type": ChannelTypes.GUILD_TEXT}, self
)
else:
cls = self.prefixed_context.from_message(self, data)
if not cls.channel:
cls.channel = await self.cache.fetch_channel(data._channel_id)
return cls
inherited
method
get_extensions(self, name)
¶
Get all ext with a name or extension name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the extension, or the name of it's extension |
required |
Returns:
Type | Description |
---|---|
list |
List of Extensions |
Source code in naff/client/auto_shard_client.py
def get_extensions(self, name: str) -> list[Extension]:
"""
Get all ext with a name or extension name.
Args:
name: The name of the extension, or the name of it's extension
Returns:
List of Extensions
"""
if name not in self.ext.keys():
return [ext for ext in self.ext.values() if ext.extension_name == name]
return [self.ext.get(name, None)]
inherited
method
get_ext(self, name)
¶
Get a extension with a name or extension name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the extension, or the name of it's extension |
required |
Returns:
Type | Description |
---|---|
naff.models.naff.extension.Extension | None |
A extension, if found |
Source code in naff/client/auto_shard_client.py
def get_ext(self, name: str) -> Extension | None:
"""
Get a extension with a name or extension name.
Args:
name: The name of the extension, or the name of it's extension
Returns:
A extension, if found
"""
if ext := self.get_extensions(name):
return ext[0]
return None
inherited
method
load_extension(self, name, package, **load_kwargs)
¶
Load an extension with given arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the extension. |
required |
package |
str |
The package the extension is in |
None |
load_kwargs |
The auto-filled mapping of the load keyword arguments |
{} |
Source code in naff/client/auto_shard_client.py
def load_extension(self, name: str, package: str = None, **load_kwargs) -> None:
"""
Load an extension with given arguments.
Args:
name: The name of the extension.
package: The package the extension is in
load_kwargs: The auto-filled mapping of the load keyword arguments
"""
name = importlib.util.resolve_name(name, package)
if name in self.__modules:
raise Exception(f"{name} already loaded")
module = importlib.import_module(name, package)
try:
setup = getattr(module, "setup", None)
if not setup:
raise ExtensionLoadException(
f"{name} lacks an entry point. Ensure you have a function called `setup` defined in that file"
) from None
setup(self, **load_kwargs)
except ExtensionLoadException:
raise
except Exception as e:
del sys.modules[name]
raise ExtensionLoadException(f"Unexpected Error loading {name}") from e
else:
logger.debug(f"Loaded Extension: {name}")
self.__modules[name] = module
if self.sync_ext and self._ready.is_set():
try:
asyncio.get_running_loop()
except RuntimeError:
return
asyncio.create_task(self.synchronise_interactions())
inherited
method
unload_extension(self, name, package, **unload_kwargs)
¶
Unload an extension with given arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
The name of the extension. |
required | |
package |
The package the extension is in |
None |
|
unload_kwargs |
The auto-filled mapping of the unload keyword arguments |
{} |
Source code in naff/client/auto_shard_client.py
def unload_extension(self, name, package=None, **unload_kwargs) -> None:
"""
Unload an extension with given arguments.
Args:
name: The name of the extension.
package: The package the extension is in
unload_kwargs: The auto-filled mapping of the unload keyword arguments
"""
name = importlib.util.resolve_name(name, package)
module = self.__modules.get(name)
if module is None:
raise ExtensionNotFound(f"No extension called {name} is loaded")
try:
teardown = getattr(module, "teardown")
teardown(**unload_kwargs)
except AttributeError:
pass
for ext in self.get_extensions(name):
ext.drop(**unload_kwargs)
del sys.modules[name]
del self.__modules[name]
if self.sync_ext and self._ready.is_set():
if self.sync_ext and self._ready.is_set():
try:
asyncio.get_running_loop()
except RuntimeError:
return
asyncio.create_task(self.synchronise_interactions())
inherited
method
reload_extension(self, name, package, *, load_kwargs, unload_kwargs)
¶
Helper method to reload an extension. Simply unloads, then loads the extension with given arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
The name of the extension. |
required | |
package |
The package the extension is in |
None |
|
load_kwargs |
Mapping[str, Any] |
The manually-filled mapping of the load keyword arguments |
None |
unload_kwargs |
Mapping[str, Any] |
The manually-filled mapping of the unload keyword arguments |
None |
Source code in naff/client/auto_shard_client.py
def reload_extension(
self, name, package=None, *, load_kwargs: Mapping[str, Any] = None, unload_kwargs: Mapping[str, Any] = None
) -> None:
"""
Helper method to reload an extension. Simply unloads, then loads the extension with given arguments.
Args:
name: The name of the extension.
package: The package the extension is in
load_kwargs: The manually-filled mapping of the load keyword arguments
unload_kwargs: The manually-filled mapping of the unload keyword arguments
"""
name = importlib.util.resolve_name(name, package)
module = self.__modules.get(name)
if module is None:
logger.warning("Attempted to reload extension thats not loaded. Loading extension instead")
return self.load_extension(name, package)
if not load_kwargs:
load_kwargs = {}
if not unload_kwargs:
unload_kwargs = {}
self.unload_extension(name, package, **unload_kwargs)
self.load_extension(name, package, **load_kwargs)
# todo: maybe add an ability to revert to the previous version if unable to load the new one
async
inherited
method
fetch_guild(self, guild_id)
¶
Fetch a guild.
Note
This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guild_id |
Snowflake_Type |
The ID of the guild to get |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.guild.Guild] |
Guild Object if found, otherwise None |
Source code in naff/client/auto_shard_client.py
async def fetch_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
"""
Fetch a guild.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
guild_id: The ID of the guild to get
Returns:
Guild Object if found, otherwise None
"""
try:
return await self.cache.fetch_guild(guild_id)
except NotFound:
return None
inherited
method
get_guild(self, guild_id)
¶
Get a guild.
Note
This method is an alias for the cache which will return a cached object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guild_id |
Snowflake_Type |
The ID of the guild to get |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.guild.Guild] |
Guild Object if found, otherwise None |
Source code in naff/client/auto_shard_client.py
def get_guild(self, guild_id: "Snowflake_Type") -> Optional[Guild]:
"""
Get a guild.
Note:
This method is an alias for the cache which will return a cached object.
Args:
guild_id: The ID of the guild to get
Returns:
Guild Object if found, otherwise None
"""
return self.cache.get_guild(guild_id)
async
inherited
method
create_guild_from_template(self, template_code, name, icon)
¶
Creates a new guild based on a template.
Note
This endpoint can only be used by bots in less than 10 guilds.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
template_code |
Union[GuildTemplate, str] |
The code of the template to use. |
required |
name |
str |
The name of the guild (2-100 characters) |
required |
icon |
Union[naff.models.discord.file.File, io.IOBase, BinaryIO, pathlib.Path, str, naff.client.const.Missing] |
Location or File of icon to set |
Missing |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.guild.Guild] |
The newly created guild object |
Source code in naff/client/auto_shard_client.py
async def create_guild_from_template(
self,
template_code: Union["GuildTemplate", str],
name: str,
icon: Absent[UPLOADABLE_TYPE] = MISSING,
) -> Optional[Guild]:
"""
Creates a new guild based on a template.
note:
This endpoint can only be used by bots in less than 10 guilds.
Args:
template_code: The code of the template to use.
name: The name of the guild (2-100 characters)
icon: Location or File of icon to set
Returns:
The newly created guild object
"""
if isinstance(template_code, GuildTemplate):
template_code = template_code.code
if icon:
icon = to_image_data(icon)
guild_data = await self.http.create_guild_from_guild_template(template_code, name, icon)
return Guild.from_dict(guild_data, self)
async
inherited
method
fetch_channel(self, channel_id)
¶
Fetch a channel.
Note
This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
channel_id |
Snowflake_Type |
The ID of the channel to get |
required |
Returns:
Type | Description |
---|---|
Optional[TYPE_ALL_CHANNEL] |
Channel Object if found, otherwise None |
Source code in naff/client/auto_shard_client.py
async def fetch_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
"""
Fetch a channel.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
channel_id: The ID of the channel to get
Returns:
Channel Object if found, otherwise None
"""
try:
return await self.cache.fetch_channel(channel_id)
except NotFound:
return None
inherited
method
get_channel(self, channel_id)
¶
Get a channel.
Note
This method is an alias for the cache which will return a cached object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
channel_id |
Snowflake_Type |
The ID of the channel to get |
required |
Returns:
Type | Description |
---|---|
Optional[TYPE_ALL_CHANNEL] |
Channel Object if found, otherwise None |
Source code in naff/client/auto_shard_client.py
def get_channel(self, channel_id: "Snowflake_Type") -> Optional["TYPE_ALL_CHANNEL"]:
"""
Get a channel.
Note:
This method is an alias for the cache which will return a cached object.
Args:
channel_id: The ID of the channel to get
Returns:
Channel Object if found, otherwise None
"""
return self.cache.get_channel(channel_id)
async
inherited
method
fetch_user(self, user_id)
¶
Fetch a user.
Note
This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
Snowflake_Type |
The ID of the user to get |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.user.User] |
User Object if found, otherwise None |
Source code in naff/client/auto_shard_client.py
async def fetch_user(self, user_id: "Snowflake_Type") -> Optional[User]:
"""
Fetch a user.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
user_id: The ID of the user to get
Returns:
User Object if found, otherwise None
"""
try:
return await self.cache.fetch_user(user_id)
except NotFound:
return None
inherited
method
get_user(self, user_id)
¶
Get a user.
Note
This method is an alias for the cache which will return a cached object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
Snowflake_Type |
The ID of the user to get |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.user.User] |
User Object if found, otherwise None |
Source code in naff/client/auto_shard_client.py
def get_user(self, user_id: "Snowflake_Type") -> Optional[User]:
"""
Get a user.
Note:
This method is an alias for the cache which will return a cached object.
Args:
user_id: The ID of the user to get
Returns:
User Object if found, otherwise None
"""
return self.cache.get_user(user_id)
async
inherited
method
fetch_member(self, user_id, guild_id)
¶
Fetch a member from a guild.
Note
This method is an alias for the cache which will either return a cached object, or query discord for the object if its not already cached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
Snowflake_Type |
The ID of the member |
required |
guild_id |
Snowflake_Type |
The ID of the guild to get the member from |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.user.Member] |
Member object if found, otherwise None |
Source code in naff/client/auto_shard_client.py
async def fetch_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
"""
Fetch a member from a guild.
Note:
This method is an alias for the cache which will either return a cached object, or query discord for the object
if its not already cached.
Args:
user_id: The ID of the member
guild_id: The ID of the guild to get the member from
Returns:
Member object if found, otherwise None
"""
try:
return await self.cache.fetch_member(guild_id, user_id)
except NotFound:
return None
inherited
method
get_member(self, user_id, guild_id)
¶
Get a member from a guild.
Note
This method is an alias for the cache which will return a cached object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
Snowflake_Type |
The ID of the member |
required |
guild_id |
Snowflake_Type |
The ID of the guild to get the member from |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.user.Member] |
Member object if found, otherwise None |
Source code in naff/client/auto_shard_client.py
def get_member(self, user_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[Member]:
"""
Get a member from a guild.
Note:
This method is an alias for the cache which will return a cached object.
Args:
user_id: The ID of the member
guild_id: The ID of the guild to get the member from
Returns:
Member object if found, otherwise None
"""
return self.cache.get_member(guild_id, user_id)
async
inherited
method
fetch_scheduled_event(self, guild_id, scheduled_event_id, with_user_count)
¶
Fetch a scheduled event by id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_id |
The id of the scheduled event. |
required |
Returns:
Type | Description |
---|---|
Optional[ScheduledEvent] |
The scheduled event if found, otherwise None |
Source code in naff/client/auto_shard_client.py
async def fetch_scheduled_event(
self, guild_id: "Snowflake_Type", scheduled_event_id: "Snowflake_Type", with_user_count: bool = False
) -> Optional["ScheduledEvent"]:
"""
Fetch a scheduled event by id.
Args:
event_id: The id of the scheduled event.
Returns:
The scheduled event if found, otherwise None
"""
try:
scheduled_event_data = await self.http.get_scheduled_event(guild_id, scheduled_event_id, with_user_count)
return ScheduledEvent.from_dict(scheduled_event_data, self)
except NotFound:
return None
async
inherited
method
fetch_custom_emoji(self, emoji_id, guild_id)
¶
Fetch a custom emoji by id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
emoji_id |
Snowflake_Type |
The id of the custom emoji. |
required |
guild_id |
Snowflake_Type |
The id of the guild the emoji belongs to. |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.emoji.CustomEmoji] |
The custom emoji if found, otherwise None. |
Source code in naff/client/auto_shard_client.py
async def fetch_custom_emoji(self, emoji_id: "Snowflake_Type", guild_id: "Snowflake_Type") -> Optional[CustomEmoji]:
"""
Fetch a custom emoji by id.
Args:
emoji_id: The id of the custom emoji.
guild_id: The id of the guild the emoji belongs to.
Returns:
The custom emoji if found, otherwise None.
"""
try:
return await self.cache.fetch_emoji(guild_id, emoji_id)
except NotFound:
return None
inherited
method
get_custom_emoji(self, emoji_id, guild_id)
¶
Get a custom emoji by id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
emoji_id |
Snowflake_Type |
The id of the custom emoji. |
required |
guild_id |
Optional[Snowflake_Type] |
The id of the guild the emoji belongs to. |
None |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.emoji.CustomEmoji] |
The custom emoji if found, otherwise None. |
Source code in naff/client/auto_shard_client.py
def get_custom_emoji(
self, emoji_id: "Snowflake_Type", guild_id: Optional["Snowflake_Type"] = None
) -> Optional[CustomEmoji]:
"""
Get a custom emoji by id.
Args:
emoji_id: The id of the custom emoji.
guild_id: The id of the guild the emoji belongs to.
Returns:
The custom emoji if found, otherwise None.
"""
emoji = self.cache.get_emoji(emoji_id)
if emoji and (not guild_id or emoji._guild_id == to_snowflake(guild_id)):
return emoji
return None
async
inherited
method
fetch_sticker(self, sticker_id)
¶
Fetch a sticker by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sticker_id |
Snowflake_Type |
The ID of the sticker. |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.discord.sticker.Sticker] |
A sticker object if found, otherwise None |
Source code in naff/client/auto_shard_client.py
async def fetch_sticker(self, sticker_id: "Snowflake_Type") -> Optional[Sticker]:
"""
Fetch a sticker by ID.
Args:
sticker_id: The ID of the sticker.
Returns:
A sticker object if found, otherwise None
"""
try:
sticker_data = await self.http.get_sticker(sticker_id)
return Sticker.from_dict(sticker_data, self)
except NotFound:
return None
async
inherited
method
fetch_nitro_packs(self)
¶
List the sticker packs available to Nitro subscribers.
Returns:
Type | Description |
---|---|
Optional[List[StickerPack]] |
A list of StickerPack objects if found, otherwise returns None |
Source code in naff/client/auto_shard_client.py
async def fetch_nitro_packs(self) -> Optional[List["StickerPack"]]:
"""
List the sticker packs available to Nitro subscribers.
Returns:
A list of StickerPack objects if found, otherwise returns None
"""
try:
packs_data = await self.http.list_nitro_sticker_packs()
return [StickerPack.from_dict(data, self) for data in packs_data]
except NotFound:
return None
async
inherited
method
fetch_voice_regions(self)
¶
List the voice regions available on Discord.
Returns:
Type | Description |
---|---|
List[VoiceRegion] |
A list of voice regions. |
Source code in naff/client/auto_shard_client.py
async def fetch_voice_regions(self) -> List["VoiceRegion"]:
"""
List the voice regions available on Discord.
Returns:
A list of voice regions.
"""
regions_data = await self.http.list_voice_regions()
regions = VoiceRegion.from_list(regions_data)
return regions
async
inherited
method
connect_to_vc(self, guild_id, channel_id, muted, deafened)
¶
Connect the bot to a voice channel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guild_id |
Snowflake_Type |
id of the guild the voice channel is in. |
required |
channel_id |
Snowflake_Type |
id of the voice channel client wants to join. |
required |
muted |
bool |
Whether the bot should be muted when connected. |
False |
deafened |
bool |
Whether the bot should be deafened when connected. |
False |
Returns:
Type | Description |
---|---|
ActiveVoiceState |
The new active voice state on successfully connection. |
Source code in naff/client/auto_shard_client.py
async def connect_to_vc(
self, guild_id: "Snowflake_Type", channel_id: "Snowflake_Type", muted: bool = False, deafened: bool = False
) -> ActiveVoiceState:
"""
Connect the bot to a voice channel.
Args:
guild_id: id of the guild the voice channel is in.
channel_id: id of the voice channel client wants to join.
muted: Whether the bot should be muted when connected.
deafened: Whether the bot should be deafened when connected.
Returns:
The new active voice state on successfully connection.
"""
return await self._connection_state.voice_connect(guild_id, channel_id, muted, deafened)
inherited
method
get_bot_voice_state(self, guild_id)
¶
Get the bot's voice state for a guild.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
guild_id |
Snowflake_Type |
The target guild's id. |
required |
Returns:
Type | Description |
---|---|
Optional[naff.models.naff.active_voice_state.ActiveVoiceState] |
The bot's voice state for the guild if connected, otherwise None. |
Source code in naff/client/auto_shard_client.py
def get_bot_voice_state(self, guild_id: "Snowflake_Type") -> Optional[ActiveVoiceState]:
"""
Get the bot's voice state for a guild.
Args:
guild_id: The target guild's id.
Returns:
The bot's voice state for the guild if connected, otherwise None.
"""
return self._connection_state.get_voice_state(guild_id)
async
inherited
method
change_presence(self, status, activity)
¶
Change the bots presence.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
status |
Union[str, naff.models.discord.enums.Status] |
The status for the bot to be. i.e. online, afk, etc. |
<Status.ONLINE: 'online'> |
activity |
Union[naff.models.discord.activity.Activity, str] |
The activity for the bot to be displayed as doing. |
None |
Note::
Bots may only be playing
streaming
listening
watching
or competing
, other activity types are likely to fail.
Source code in naff/client/auto_shard_client.py
async def change_presence(
self, status: Optional[Union[str, Status]] = Status.ONLINE, activity: Optional[Union[Activity, str]] = None
) -> None:
"""
Change the bots presence.
Args:
status: The status for the bot to be. i.e. online, afk, etc.
activity: The activity for the bot to be displayed as doing.
Note::
Bots may only be `playing` `streaming` `listening` `watching` or `competing`, other activity types are likely to fail.
"""
await self._connection_state.change_presence(status, activity)